/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.inlong.sort.cdc.mysql.source.utils;

import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitState;

import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecord.Fields;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.inlong.sort.base.Constants.CARET;
import static org.apache.inlong.sort.base.Constants.DDL_OP_ALTER;
import static org.apache.inlong.sort.base.Constants.DOLLAR;
import static org.apache.inlong.sort.base.Constants.GHOST_TAG;
import static org.apache.inlong.sort.base.Constants.TABLE_NAME;
import static org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;

/**
 * Utility class to deal gh-ost record.
 */
public class GhostUtils {

    /**
     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
     * will be captured and stored in the state.
     */
    public static void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord,
            String ghostTableRegex) {
        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
        if (StringUtils.isBlank(ddl)) {
            return;
        }
        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
        Pattern compile = Pattern.compile(ghostTableRegex);
        Matcher matcher = compile.matcher(tableName);
        if (matcher.find()) {
            tableName = matcher.group(1);
            TableId tableId = getTableId(element, tableName);
            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
                String matchTableInSqlRegex = ghostTableRegex;
                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
                }
                mySqlBinlogSplitState.recordTableDdl(
                        tableId,
                        ddl.replace(GHOST_TAG, "")
                                .replaceAll("\\s+", " ")
                                .replaceAll(matchTableInSqlRegex, tableName));
            }
        }
    }

    /**
     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
     * both the captured table and the gh-ost generated table.
     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
     * and `historyRecord` values of the element.
     */
    public static void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState,
            HistoryRecord historyRecord,
            String ghostTableRegex) {
        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
        for (String tableName : tableNames.split(",")) {
            Pattern compile = Pattern.compile(ghostTableRegex);
            Matcher matcher = compile.matcher(tableName);
            if (matcher.find()) {
                tableName = matcher.group(1);
                TableId tableId = getTableId(element, tableName);
                String ddl = splitState.asBinlogSplitState().getTableDdls().get(tableId);
                Struct value = (Struct) element.value();
                // update source.table and historyRecord
                value.getStruct(Fields.SOURCE).put(TABLE_NAME, tableName);
                value.put(HISTORY_RECORD_FIELD, historyRecord.document().set(Fields.DDL_STATEMENTS, ddl).toString());
            }
        }
    }

    /**
     * because the table name format generated by gh-ost is `_tableName_[gho|ghc|del]` (default),
     * we need to obtain its source table name `tableName` and generate the tableId of the source table.
     */
    private static TableId getTableId(SourceRecord element, String tableName) {
        String dbName = RecordUtils.getDbName(element);
        return RecordUtils.getTableId(
                dbName,
                tableName);
    }

}
